package spark练习题.练习题02_学生信息统计_RDD算子_SQL

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.junit.Test

class e02 {

  //配置环境
  val spark = SparkSession.builder().master("local[6]").appName("students").getOrCreate()

  import org.apache.spark.sql.functions._
  import spark.implicits._

  //读取数据
  val file = spark.sparkContext.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\data\\students")
  val data = spark.read.textFile("G:\\Projects\\IdeaProjects\\Spark_Competition\\dataset\\data\\students")
    .map {
      x =>
        var data = x.split(" ");
        (data(0), data(1), data(2).toInt, data(3), data(4), data(5).toInt)
    }.toDF("id", "name", "age", "sex", "subject", "score")

  // 展示数据集
  @Test
  def e00(): Unit = {
    data.show()
  }

  // 1.一共有多少个小于20岁的人参加考试？
  @Test
  def e01(): Unit = {
    val count = data.select('name, 'age)
      .where('age < 20)
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(2))
      }
        .filter(_._1.split("_")(2).toInt < 20).count())
  }

  // 2.一共有多少个等于20岁的人参加考试？
  @Test
  def e02(): Unit = {
    val count = data.select('name, 'age)
      .where('age === 20)
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(2))
      }
        .filter(_._1.split("_")(2).toInt == 20).count())
  }

  // 3.一共有多少个大于20岁的人参加考试？
  @Test
  def e03(): Unit = {
    val count = data.select('name, 'age)
      .where('age > 20)
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(2))
      }
        .filter(_._1.split("_")(2).toInt > 20).count())
  }

  // 4.一共有多个男生参加考试？
  @Test
  def e04(): Unit = {
    val count = data.select('name, 'sex, 'age)
      .where('sex === "男")
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(3))
      }
        .filter(_._1.split("_")(2).toString.equals("男")).count())
  }

  // 5.一共有多少个女生参加考试？
  @Test
  def e05(): Unit = {
    val count = data.select('name, 'sex, 'age)
      .where('sex === "女")
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(3))
      }
        .filter(_._1.split("_")(2).toString.equals("女")).count())
  }

  // 6.12班有多少人参加考试？
  @Test
  def e06(): Unit = {
    val count = data.select('id, 'name, 'sex)
      .where('id === "12")
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(3))
      }
        .filter(_._1.split("_")(0).toString.equals("12")).count())
  }

  // 7.13班有多少人参加考试？
  @Test
  def e07(): Unit = {
    val count = data.select('id, 'name, 'sex)
      .where('id === "13")
      .distinct()
      .show()

    println(
      file.groupBy {
        x =>
          val data = x.split(" ");
          (data(0) + "_" + data(1) + "_" + data(3))
      }
        .filter(_._1.split("_")(0).toString.equals("13")).count())
  }

  // 8.语文科目的平均成绩是多少？
  @Test
  def e08(): Unit = {
    val count = data.select('subject, 'score)
      .where('subject === "chinese")
      .agg(avg("score") as "chinese_avg")
      .show()
    println(
      file.filter(_.split(" ")(4).equals("chinese"))
        .map(_.split(" ")(5).toFloat)
        .mean()
    )
  }

  // 9.数学科目的平均成绩是多少？
  @Test
  def e09(): Unit = {
    val count = data.select('subject, 'score)
      .where('subject === "math")
      .agg(avg("score") as "math_avg")
      .show()
    println(
      file.filter(_.split(" ")(4).equals("math"))
        .map(_.split(" ")(5).toFloat)
        .mean()
    )
  }

  // 10.英语科目的平均成绩是多少？
  @Test
  def e10(): Unit = {
    val count = data.select('subject, 'score)
      .where('subject === "english")
      .agg(avg("score") as "english_avg")
      .show()
    println(
      file.filter(_.split(" ")(4).equals("english"))
        .map(_.split(" ")(5).toFloat)
        .mean()
    )
  }


  // 11.单个人平均成绩是多少？
  @Test
  def e11(): Unit = {
    val window = Window.partitionBy('name)
    val count = data.select('name, avg("score") over (window) as "avg_score")
      .distinct()
      .show()

    println(
      file.map {
        x =>
          var data = x.split(" ");
          ((data(0) + "_" + data(1)), data(5).toFloat)
      }.map(x => (x._1, (x._2, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
        .map(x => (x._1, x._2._1 / x._2._2))
        .collect().mkString(",").foreach(print(_))
    )
  }


  // 12.12\13班平均成绩是多少？
  @Test
  def e12(): Unit = {
    val window = Window.partitionBy('id)
    val count = data.select('id, avg("score") over (window) as "avg_score")
      .distinct()
      .show()
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(0), data(5).toFloat)
      }.map(x => (x._1, (x._2, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
        .map(x => (x._1, x._2._1 / x._2._2))
        .collect().mkString(",").foreach(print(_))
    )
  }

  // 13.12\13班男女生平均成绩是多少？
  @Test
  def e13(): Unit = {
    val window = Window.partitionBy('id, 'sex)
    val count = data.select('id, 'sex, avg("score") over (window) as "avg_score")
      .distinct()
      .show()
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(0) + "_" + data(3), data(5).toFloat)
      }.map(x => (x._1, (x._2, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
        .map(x => (x._1, x._2._1 / x._2._2))
        .collect().mkString(",").foreach(print(_))
    )
  }

  // 14.全校语文、数学、英语成绩最高分是多少？
  @Test
  def e14(): Unit = {
    // 全校语文、数学、英语成绩最高分排表
    val window = Window.partitionBy('subject)
    val count = data.select('subject, max("score") over (window) as "max_score")
      .distinct()
      .show()
    // 全校语文成绩最高分
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(4), data(5).toFloat)
      }.filter(x => x._1.contains("chinese"))
        .max()
    )
    println(file.filter {
      _.split(" ")(4).equals("chinese")
    }.map(_.split(" ")(5)).max())
  }

  // 15.全校语文、数学、英语成绩每班最高最低分是多少？
  @Test
  def e15(): Unit = {
    // 各班各科成绩最大最小排列表
    val window = Window.partitionBy('id, 'subject)
    val count =
      data.select('id, 'subject, max("score") over (window) as "max_score",
        min("score") over (window) as "min_score")
        .sort('id, 'subject)
        .distinct()
        .show()
    // 12班语文成绩最低分
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(0) + "_" + data(4), data(5).toFloat)
      }.filter(x => x._1.contains("12_chinese"))
        .min()
    )
    //13班数学成绩最高分
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(0) + "_" + data(4), data(5).toFloat)
      }.filter(x => x._1.contains("13_math"))
        .max()
    )
  }

  // 16.总成绩大于150分的12班的女生有几个？
  @Test
  def e16(): Unit = {
    val window = Window.partitionBy('id, 'name)
    val count =
      data.select('id, 'name, 'age, 'subject, 'sex, 'score, sum("score") over (window) as "sum_score")
        .where('sum_score > 150 && 'id === "12" && 'sex === "女")
        .orderBy('id, 'name)
        .show()
    // 有shuffle
    println(
      file.map {
        x =>
          var data = x.split(" ");
          (data(0) + "_" + data(3), data(5).toFloat)
      }
        .filter(x => x._1.contains("12_女"))
        .reduceByKey(_ + _)
        .filter(_._2 > 150)
        .count()
    )
    // 无shuffle
    println(
      file.filter {
        x =>
          var datas = x.split(" ");
          datas(3).equals("女") && datas(0).equals("12")
      }
        .groupBy(_.split(" ")(1))
        .map {
          case (name, list) => {
            list.map(_.split(" ")(5).toFloat).sum
          }
        }
        .filter(_ > 150).count())

  }

  // 17.总成绩大于150分，且数学大于等于70，且年龄大于等于19岁的学生的平均成绩是多少？
  @Test
  def e17(): Unit = {
    val window = Window.partitionBy('id, 'name)
    val count =
      data.select('id, 'name, 'age, 'subject, 'sex, 'score, avg("score") over (window) as "avg_score",
        sum("score") over (window) as "sum_score")
        .where('sum_score > 150 && 'subject === "math" && 'score >= 70 && 'age >= 19)
        .orderBy('id, 'name)
        .show()
    // 思路1：
    val base = file.map {
      x =>
        var data = x.split(" ");
        (data(0) + "_" + data(1), (data(2).toInt, data(4), data(5).toFloat))
    }
    //过滤出总分大于150的,并求出平均成绩
    val data1 = base.map(x => (x._1, (x._2._3, 1)))
      .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .filter(_._2._1 > 150)
      .map(x => (x._1, x._2._1 / x._2._2))
    //过滤出 数学大于等于70，且年龄大于等于19岁的学生
    val data2 =
      base.filter(x => x._2._1 >= 19 && x._2._2.equals("math") && x._2._3 >= 70.0)

    println(data1.join(data2).collect().mkString(","))


    //思路2：
    val data3 = file.map(x => {
      val line = x.split(" "); (line(0) + "," + line(1) + "," + line(3), line(5).toInt)
    })
    val data4 = file.map(x => {
      val line = x.split(" "); (line(0) + "," + line(1) + "," + line(3) + "," + line(4), line(5).toInt)
    })

    //过滤出总分大于150的,并求出平均成绩
    val com1 = data3
      .map(a => (a._1, (a._2, 1)))
      .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
      .filter(x => (x._2._1 > 150))
      .map(t => (t._1, t._2._1 / t._2._2))

    //过滤出 数学大于等于70，且年龄大于等于19岁的学生
    val com2 = data4
      .filter(
        x => {
          val datas = x._1.split(",");
          datas(3).equals("math") && x._2 >= 70
        }
      )
      .map(
        x => {
          val datas = x._1.split(",");
          (datas(0) + "," + datas(1) + "," + datas(2), x._2.toInt)
        }
      )
    println(com1.join(com2).map(x => (x._1, x._2._1)).collect().mkString(","))

  }

}
